/*
 * MIT License
 *
 * Copyright (c) 2023 北京凯特伟业科技有限公司
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */
package com.je.connector.base.server;

import com.google.common.base.Strings;
import com.je.connector.base.codec.PacketDecoder;
import com.je.connector.base.codec.PacketEncoder;
import com.je.connector.base.exception.ServerException;
import com.je.connector.base.service.BaseService;
import com.je.connector.base.service.IServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory;

/**
 * 基于Netty的NIO TCPserver的实现
 *
 * @ProjectName: instant-message
 * @Package: com.connector.server
 * @ClassName: NettyTcpServer
 * @Description: 定义基于Netty的TCP Server的实现
 * @Author: LIULJ
 * @Version: 1.0
 * <p>Copyright: Copyright (c) 2018</p>
 */
@Getter
@Slf4j
public abstract class NettyTcpServer extends BaseService implements IServer {

    /**
     * Reactor线程名称
     */
    private static final String BOSS_THREAD_NAME = "connector_server_boss";
    /**
     * 工作组线程名称
     */
    private static final String WORKER_THREAD_NAME = "connector_server_worker";
    /**
     * 默认不启用EPOLL，因为需要操作系统支持
     */
    private static final boolean DEFAULT_USE_EPOLL = false;

    private final String host;
    private final int port;
    private boolean useEpoll = DEFAULT_USE_EPOLL;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    public NettyTcpServer(int port) {
        this.host = null;
        this.port = port;
    }

    public NettyTcpServer(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public void init() {
        if (started.get() == true) {
            log.error("server already started");
            throw new ServerException("server already started");
        }
    }

    @Override
    public void start() {
        if (useEpoll) {
            createEpollServer();
        } else {
            createNioServer();
        }
    }

    @Override
    public void stop() {
        if (started.get() == false) {
            log.error("server already stopped");
            throw new ServerException("server already stopped");
        }
        log.info("try shutdown {}", getClass().getSimpleName());
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().syncUninterruptibly();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully().syncUninterruptibly();
        }
        log.info("{} shutdown success", getClass().getSimpleName());
    }

    /**
     * 创建NIO
     */
    private void createNioServer() {
        EventLoopGroup bossGroup = getBossGroup();
        EventLoopGroup workerGroup = getWorkerGroup();
        if (bossGroup == null) {
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(getBossThreadNum(), getBossThreadFactory(), getSelectorProvider());
            nioEventLoopGroup.setIoRatio(100);
            bossGroup = nioEventLoopGroup;
        }

        if (workerGroup == null) {
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(getWorkThreadNum(), getWorkThreadFactory(), getSelectorProvider());
            nioEventLoopGroup.setIoRatio(getIoRate());
            workerGroup = nioEventLoopGroup;
        }
        createServer(bossGroup, workerGroup, getChannelFactory());
    }

    /**
     * 创建Epoll Server，需要操作系统支持
     */
    private void createEpollServer() {
        EventLoopGroup bossGroup = getBossGroup();
        EventLoopGroup workerGroup = getWorkerGroup();
        if (bossGroup == null) {
            EpollEventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(getBossThreadNum(), getBossThreadFactory());
            epollEventLoopGroup.setIoRatio(100);
            bossGroup = epollEventLoopGroup;
        }
        if (workerGroup == null) {
            EpollEventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(getWorkThreadNum(), getWorkThreadFactory());
            epollEventLoopGroup.setIoRatio(getIoRate());
            workerGroup = epollEventLoopGroup;
        }
        createServer(bossGroup, workerGroup, EpollServerSocketChannel::new);
    }

    /**
     * 创建服务
     *
     * @param bossGroup
     * @param workerGroup
     */
    private void createServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ChannelFactory<? extends ServerChannel> channelFactory) {
        /***
         * NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器，
         * Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。
         * 在一个服务端的应用会有2个NioEventLoopGroup会被使用。
         * 第一个经常被叫做‘boss’，用来接收进来的连接。
         * 第二个经常被叫做‘worker’，用来处理已经被接收的连接，
         * 一旦‘boss’接收到连接，就会把连接信息注册到‘worker’上。
         * 如何知道多少个线程已经被使用，如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现，
         * 并且可以通过构造函数来配置他们的关系。
         */
        this.bossGroup = bossGroup;
        this.workerGroup = workerGroup;
        try {
            /**
             * ServerBootstrap 是一个启动NIO服务的辅助启动类
             * 你可以在这个服务中直接使用Channel
             */
            ServerBootstrap b = new ServerBootstrap();
            /**
             * 这一步是必须的，如果没有设置group将会报java.lang.IllegalStateException: group not set异常
             */
            b.group(bossGroup, workerGroup);
            /***
             * ServerSocketChannel以NIO的selector为基础进行实现的，用来接收新的连接
             * 这里告诉Channel如何获取新的连接.
             */
            b.channelFactory(channelFactory);
            /***
             * 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。
             * ChannelInitializer是一个特殊的处理类，
             * 他的目的是帮助使用者配置一个新的Channel。
             * 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel
             * 或者其对应的ChannelPipeline来实现你的网络程序。
             * 当你的程序变的复杂时，可能你会增加更多的处理类到pipeline上，
             * 然后提取这些匿名类到最顶层的类上。
             */
            b.childHandler(new ChannelInitializer() {
                //每连上一个链接调用一次
                @Override
                public void initChannel(Channel ch) throws Exception {
                    initPipeline(ch.pipeline());
                }
            });

            initOptions(b);

            /***
             * 绑定端口并启动去接收进来的连接
             */
            InetSocketAddress address = Strings.isNullOrEmpty(host) ? new InetSocketAddress(port) : new InetSocketAddress(host, port);
            Channel ch = b.bind(address).sync().channel();
            if (started.compareAndSet(false, true)) {
                log.info("server started at port {}", port);
                ch.closeFuture().sync();
            } else {
                ch.close();
            }
        } catch (Exception e) {
            log.error("server start exception", e);
            throw new ServerException(e);
        }
    }

    protected void initPipeline(ChannelPipeline pipeline) {
        pipeline.addLast("decoder", getDecoder());
        pipeline.addLast("encoder", getEncoder());
        pipeline.addLast("handler", getChannelHandler());
    }

    /**
     * option()是提供给NioServerSocketChannel用来接收进来的连接。
     * childOption()是提供给由父管道ServerChannel接收到的连接，
     * 在这个例子中也是NioServerSocketChannel。
     */
    protected void initOptions(ServerBootstrap b) {
        //b.childOption(ChannelOption.SO_KEEPALIVE, false);// 使用应用层心跳

        /**
         * 在Netty 4中实现了一个新的ByteBuf内存池，它是一个纯Java版本的 jemalloc （Facebook也在用）。
         * 现在，Netty不会再因为用零填充缓冲区而浪费内存带宽了。不过，由于它不依赖于GC，开发人员需要小心内存泄漏。
         * 如果忘记在处理程序中释放缓冲区，那么内存使用率会无限地增长。
         * Netty默认不使用内存池，需要在创建客户端或者服务端的时候进行指定
         */
        b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    protected ChannelHandler getDecoder() {
        return new PacketDecoder();
    }

    public SelectorProvider getSelectorProvider() {
        return SelectorProvider.provider();
    }

    /**
     * netty 默认的Executor为ThreadPerTaskExecutor
     * 线程池的使用在SingleThreadEventExecutor#doStartThread
     * <p>
     * eventLoop.execute(runnable);
     * 是比较重要的一个方法。在没有启动真正线程时，
     * 它会启动线程并将待执行任务放入执行队列里面。
     * 启动真正线程(startThread())会判断是否该线程已经启动，
     * 如果已经启动则会直接跳过，达到线程复用的目的
     *
     * @return
     */
    protected ThreadFactory getBossThreadFactory() {
        return new DefaultThreadFactory(BOSS_THREAD_NAME);
    }

    protected ThreadFactory getWorkThreadFactory() {
        return new DefaultThreadFactory(WORKER_THREAD_NAME);
    }

    protected int getBossThreadNum() {
        return 1;
    }

    protected int getWorkThreadNum() {
        return 0;
    }

    protected int getIoRate() {
        return 70;
    }

    public ChannelFactory<? extends ServerChannel> getChannelFactory() {
        return NioServerSocketChannel::new;
    }

    /**
     * 每连上一个链接调用一次, 所有用单例
     *
     * @return
     */
    protected ChannelHandler getEncoder() {
        return new PacketEncoder();
    }

    public abstract ChannelHandler getChannelHandler();
}
